"""
Case Type   : USTORE
Case Name   : 验证开启逻辑解码插入int类型的数据
Create At   : 2025/4/7
Owner       : wan005
Description :
    1.查看wal_level，enable_slot_log，enable_default_ustore_table的初始值
    2.修改wal_level=logical;enable_slot_log=on；enable_default_ustore_table=on
    3.配置白名单用户
    4.主机创建逻辑复制槽
    5.建表，插入数据
    6.执行逻辑复制槽流式解码
    7.检查生成的文件
Expect      :
    1.初始值分别对应为hot_standby和off，offshow wal_level;show enable_slot_log
    2.修改成功
    3.配置完成
    4.创建成功
    5.建表成功
    6.解码无报错
    7.文件生成成功
History     :
"""
import os
import unittest
from yat.test import Node
from yat.test import macro
from testcase.utils.Common import Common
from testcase.utils.CommonSH import CommonSH
from testcase.utils.Logger import Logger
from testcase.utils.Constant import Constant
from testcase.utils.ComThread import ComThread


class Security(unittest.TestCase):
    def setUp(self):
        self.logger = Logger()
        self.logger.info(f'---{os.path.basename(__file__)} start---')
        self.userNode = Node('PrimaryDbUser')
        self.sh_primy = CommonSH('PrimaryDbUser')
        self.constant = Constant()
        self.common = Common()
        self.slotname = 's_replication_016'
        self.table = 't_replication_016'
        self.config = os.path.join(macro.DB_INSTANCE_PATH, 'pg_hba.conf')
        self.config_bak = os.path.join(macro.DB_INSTANCE_PATH, 'pg_hba_bak.conf')
        self.decode_file = os.path.join(macro.DB_INSTANCE_PATH, 'deco_file.txt')

    def test_ustore(self):
        text = '-----pre1:备份pg_hba.conf文件;expect:成功-----'
        self.logger.info(text)
        cp_cmd = f"cp {self.config} {self.config_bak}"
        self.logger.info(cp_cmd)
        cp_result = self.userNode.sh(cp_cmd).result()
        self.logger.info(cp_result)
        
        text = '-----step1:查询参数默认值;expect:成功-----'
        self.logger.info(text)
        res1_1 = self.common.show_param('wal_level')
        self.assertEqual('hot_standby', res1_1, text + '执行失败')
        res1_2 = self.common.show_param('enable_slot_log')
        self.assertEqual('off', res1_2, text + '执行失败')
        res1_3 = self.common.show_param('enable_default_ustore_table')
        self.assertEqual('off', res1_3, text + '执行失败')

        text = '----step2:修改参数值;expect:成功----'
        self.logger.info(text)
        self.sh_primy.execute_gsguc('set', self.constant.GSGUC_SUCCESS_MSG,
                                    'wal_level=logical')
        self.sh_primy.restart_db_cluster()
        res2_1 = self.common.show_param('wal_level')
        self.assertEqual('logical', res2_1, text + '执行失败')
        self.sh_primy.execute_gsguc('reload', self.constant.GSGUC_SUCCESS_MSG,
                                    'enable_slot_log=on')
        res2_2 = self.common.show_param('enable_slot_log')
        self.assertEqual('on', res2_2, text + '执行失败')
        self.sh_primy.execute_gsguc('reload', self.constant.GSGUC_SUCCESS_MSG,
                                    'enable_default_ustore_table=on')
        res2_3 = self.common.show_param('enable_default_ustore_table')
        self.assertEqual('on', res2_3, text + '执行失败')

        text = '----step3:配置白名单;expect:成功----'
        self.logger.info(text)
        param_1 = f'local replication {self.userNode.db_user}  trust'
        param_2 = f'host replication {self.userNode.db_user} 127.0.0.1/32 trust'
        param_3 = f'host replication {self.userNode.db_user} ::1/128 trust'
        self.sh_primy.execute_gsguc('reload', self.constant.GSGUC_SUCCESS_MSG,
                                    'enable_default_ustore_table=on')
        res3_1 = self.sh_primy.execute_gsguc('reload',
                    self.constant.GSGUC_SUCCESS_MSG, '', pghba_param=param_1)
        self.assertTrue(res3_1, text + '执行失败')
        res3_2 = self.sh_primy.execute_gsguc('reload',
                    self.constant.GSGUC_SUCCESS_MSG, '', pghba_param=param_2)
        self.assertTrue(res3_2, text + '执行失败')
        res3_3 = self.sh_primy.execute_gsguc('reload',
                    self.constant.GSGUC_SUCCESS_MSG, '', pghba_param=param_3)
        self.assertTrue(res3_3, text + '执行失败')

        text = '----step4:主机创建逻辑复制槽;expect:成功----'
        self.logger.info(text)
        res4 = self.sh_primy.exec_pg_recvlogical('--create',
                                    self.userNode.db_name, self.slotname,
                                    self.userNode.db_user, '-P mppdb_decoding',
                                    passwd=self.userNode.db_password)
        self.assertNotIn('FATAL', res4, text + '执行失败')
        self.assertNotIn('ERROR', res4, text + '执行失败')

        text = '----step5:建表，插入int类型的数据;expect:成功----'
        self.logger.info(text)
        sql_cmd5 = f'drop table if exists {self.table};' \
                f'create table {self.table}(a int) with(storage_type=ustore);' \
                f'insert into {self.table} values(100);'
        res5 = self.sh_primy.execut_db_sql(sql_cmd5)
        self.logger.info(res5)
        self.assertIn('CREATE TABLE', res5, text + '执行失败')

        text = '----step6:执行逻辑复制槽流式解码;expect:解码无报错----'
        self.logger.info(text)
        thread01 = ComThread(self.sh_primy.exec_pg_recvlogical,
                            args=('--start', self.userNode.db_name,
                            self.slotname, self.userNode.db_user,
                            f'-f {self.decode_file} -s 2 -v -P mppdb_decoding',
                            self.userNode.db_password))
        thread01.setDaemon(True)
        thread01.start()
        thread01.join(10)
        self.logger.info(thread01.get_result())

        text = '----step7:文件生成;expect:可查看到信息----'
        self.logger.info(text)
        cmd7 = f"cat {self.decode_file};"
        self.logger.info(cmd7)
        res7 = self.userNode.sh(cmd7).result()
        self.logger.info(res7)
        self.assertIn(self.table, res7, text + '执行失败')

    def tearDown(self):
        text = '-----step1:清理环境；expect:成功-----'
        self.logger.info(text)
        text = "-----停止逻辑复制槽-----"
        self.logger.info(text)
        stop_cmd = "ps -ef | grep  pg_recvlogical | grep -v grep | " \
                   "awk '{{print $2}}' | xargs kill -9"
        self.logger.info(stop_cmd)
        stop_res = self.userNode.sh(stop_cmd).result()
        self.logger.info(stop_res)
        self.logger.info('-----恢复配置文件-----')
        clean_cmd1 = f"cp -f {self.config_bak} {self.config};" \
                     f"rm -rf {self.config_bak};"
        self.logger.info(clean_cmd1)
        clean_res1 = self.userNode.sh(clean_cmd1).result()
        self.logger.info(clean_res1)
        text = '----删除逻辑复制槽----'
        self.logger.info(text)
        del_res = self.sh_primy.exec_pg_recvlogical('--drop',
                                        self.userNode.db_name, self.slotname,
                                        self.userNode.db_user,
                                        passwd=self.userNode.db_password)
        self.assertNotIn('FATAL', del_res, text + '执行失败')
        self.assertNotIn('ERROR', del_res, text + '执行失败')
        self.logger.info('-----恢复配参数配置-----')
        self.sh_primy.execute_gsguc('set', self.constant.GSGUC_SUCCESS_MSG,
                                    'wal_level=hot_standby')
        self.sh_primy.restart_db_cluster()
        clean_res2 = self.common.show_param('wal_level')
        self.assertEqual('hot_standby', clean_res2, text + '执行失败')
        self.sh_primy.execute_gsguc('reload', self.constant.GSGUC_SUCCESS_MSG,
                                    'enable_slot_log=off')
        clean_res3 = self.common.show_param('enable_slot_log')
        self.assertEqual('off', clean_res3, text + '执行失败')
        self.sh_primy.execute_gsguc('reload', self.constant.GSGUC_SUCCESS_MSG,
                                    'enable_default_ustore_table=off')
        clean_res4 = self.common.show_param('enable_default_ustore_table')
        self.assertEqual('off', clean_res4, text + '执行失败')
        self.logger.info('-----删除表数据-----')
        clean_cmd5 = f'drop table if exists {self.table};'
        clean_res5 = self.sh_primy.execut_db_sql(clean_cmd5)
        self.logger.info(clean_res5)
        self.assertIn(self.constant.DROP_TABLE_SUCCESS, clean_res5,
                      text + '执行失败')
        self.logger.info(f'---{os.path.basename(__file__)} end---')
